RocketMQ 领域模型

  • Apache RocketMQ (opens new window) 是一个起源于阿里巴巴的开源分布式消息中间件和流数据处理平台。

  • 一个 Topic 在物理上可以被分成多个 Queue,分布在不同的 Broker 上。Queue 是消息存储和并行消费的最小单元。发送和消费的并行度都与 Queue 的数量直接相关。

  • 生产者向指定 Topic 发送消息,消费者订阅指定 Topic 来消费消息。

  • 一条消息只会被同一个消费者组内的一个消费者消费。

# 工作流程

  1. Broker 启动后,会向所有 Name Server 注册自己的路由信息(包含自己负责哪些 Topic 等)
  2. Producer 启动时,会从 Name Server 获取它要发送的 Topic 的路由信息(即该 Topic 的 Queue 分布在哪些 Broker 上)
  3. Producer 根据负载均衡策略,选择一条 Queue,将消息发送到对应的 Broker
  4. Broker 接收到消息后,将其持久化到存储系统(CommitLog)
  5. Consumer 启动后,同样从 Name Server 获取 Topic 的路由信息,并连接到对应的 Broker
  6. Consumer 从 Broker 拉取消息,进行消费,消费成功后向 Broker 返回一个确认(ACK);如果消费失败,消息会被重投

# 消息类型

  • 支持的消息类型有:普通消息 Normal、顺序消息 FIFO、定时/延时消息 Delay、事务消息 Transaction

# 普通消息

# 顺序消息

  • 消费者按照发送消息的先后顺序获取消息

# 4.x 版本

  • 对于顺序消息,所有消息按照 Sharding Key 进行区块分区,同一分区内的消息将按照先进先出(FIFO)原则进行消费。同一分区内的消息保证顺序性,不同分区的消息消费顺序不做要求。
  • 实现方式:
    1. 生产者将同一组需要顺序处理的消息发送到同一个消息队列中(使用 MessageQueueSelector 选择队列)
    2. 消费者以顺序模式消费消息,consumeMode = ConsumeMode.ORDERLY
  • 在顺序消费模式下,对某个队列的消费是串行的,一个队列在同一时刻只会被分配给消费者组内的一个消费实例中的一个线程来进行消费。因此,系统的总吞吐量取决于 Topic 的队列数量。
  • 在顺序消费模式下,消费线程数应设置为略大于单个消费者实例可能分配到的最大队列数。

# 5.x 版本

  • 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

  • 相同消息组的消息按照先后顺序被存储在同一个队列;不同消息组的消息可以混合在同一个队列中,且不保证连续

  • 保证消息的顺序性

    • 生产顺序性:同一消息组、单一生产者、串行发送

    • 消费顺序性:投递顺序、顺序消费、有限重试

      消费者类型为 PushConsumer 时,RocketMQ 保证消息按照存储顺序一条一条投递给消费者;

      消费者类型为 SimpleConsumer 时,则消费者有可能一次拉取多条消息,此时,消息消费的顺序性需要由业务方自行保证。

  • 建议:串行消费,避免批量消费导致乱序;消息组尽可能打散,避免集中导致热点

# 定时/延时消息

  • 消息被发送至服务端后,在指定时间后才能被消费者消费

# 事务消息

  • 在分布式场景下保障消息生产本地事务的最终一致性
  • 处理流程:
    1. 生产者发送消息,为半事务消息
    2. 生产者开始执行本地事务逻辑
    3. 生产者根据本地事务执行结果向服务端提交二次确认结果

# 生产者负载均衡 (opens new window)

  • 对于非顺序消息(普通消息、定时/延时消息、事务消息)场景,使用 RoundRobin 模式的负载均衡策略
  • 对于顺序消息场景,使用 MessageGroupHash 模式的负载均衡策略
  • RoundRobin 模式:生产者发送消息时,以消息为粒度,按照轮询方式将消息依次发送到指定主题中的所有可写目标队列中,保证消息尽可能均衡地分布到所有队列
  • MessageGroupHash 模式:生产者发送消息时,以消息组为粒度,按照内置的 Hash 算法(SipHash),将相同消息组的消息分配到同一队列中,保证同一消息组的消息按照发送的先后顺序存储

4.x 版本:

  1. 无序消息(普通消息、事务消息、定时和延时消息):Producer 将消息以轮询的方式发送至 Queue
  2. 顺序消息:相同 Sharding Key 的消息被发送至同一个Queue

# 消费者负载均衡

  • 同一条消息支持被多个消费者分组订阅,同时,对于每个消费者分组可以初始化多个消费者。消费效果:

    1. 消费组间广播消费 :每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
    2. 消费组内共享消费 :每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。
  • 在消费组内共享消费场景下,

    1. 对于 PushConsumer 和 SimpleConsumer 类型的消费者,使用消息粒度负载均衡策略
    2. 对于历史版本(服务端 4.x/3.x 版本)的消费者、PullConsumer 类型的消费者,使用队列粒度负载均衡策略
  • 消息粒度负载均衡:同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费

    • 顺序消息负载机制:顺序消息场景下,不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。在消费过程中,前面的消息 M1、M2 被消费者 Consumer A1 处理时,只要消费状态没有提交,消费者 A2 是无法并行消费后续的 M3、M4 消息的,必须等前面的消息提交消费状态后才能消费后面的消息。

      顺序消息负载机制
  • 队列粒度负载均衡:同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费

# 消费者分类

  • RocketMQ 既提供了 Push 模式 也提供了 Pull 模式

  • RocketMQ 的默认消费模式(PushConsumer)是基于 Pull 模式的长轮询实现的,但对开发者呈现为 Push 的编程模型

# PushConsumer

  • 在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑
  • 由 RocketMQ 的 SDK 在后台完成消息获取、触发监听器调用以及进行消息重试处理
  • 内部原理:SDK 内置了一个长轮询线程,先将消息异步拉取到 SDK 内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑
RocketMQ PushConsumer

# SimpleConsumer

  • 业务方主动拉取消息,并主动调用接口返回消费结果

# PullConsumer

  • 业务方自行按队列拉取消息
  • 仅推荐流处理场景使用

# 消费重试

  • 若 Consumer 消费某条消息失败,RocketMQ 会自动进行消息重试,若达到最大重试次数后消费依然失败,则消息将被投递至死信队列。
  • 死信队列是死信 Topic 下分区数唯一的单独队列。
  • ConsumerGroup 的死信 Topic 名称为 %DLQ%ConsumerGroupName,死信队列的消息不会再被消费。

# 基本最佳实践 (opens new window)

# 生产者(Producer)

# Tag 的使用

  • 目的:用于消息过滤。消费者可以订阅指定 Tag 来筛选消息。
  • 建议:一个应用尽量使用一个 Topic,用 Tags 来标识消息的子类型。只有发送时设置了 Tag,消费时才能按 Tag 过滤。

# Keys 的使用

  • 目的:用于消息定位和排查问题(如消息丢失)。
  • 建议:将消息映射到业务的唯一标识(如订单 ID、用户 ID),并设置到 keys 字段。服务器会为其创建哈希索引,便于查询。key 应尽可能唯一以避免哈希冲突。

# 日志打印

  • 发送成功或失败都应打印消息日志,便于业务排查。只要 send 方法不抛异常,即代表发送成功。

# 消息发送失败处理

  • Producer 的 send 方法本身支持内部重试。
  • 如果业务要求绝对不丢消息,建议的兜底方案是:发送失败时,将消息持久化到 db,然后由后台线程定时重试,确保消息最终送达 Broker。

# 消费者(Consumer)

# 消费幂等性

  • 核心:RocketMQ 保证至少投递一次(At-Least-Once),无法避免消息重复(例如因重试机制导致)。
  • 要求:业务层面必须自行实现幂等消费。
  • 方法:利用关系数据库进行去重。确定消息的唯一键(可以是 msgId,但更推荐业务唯一标识如订单 ID),通过尝试插入并捕获主键冲突来判断是否已消费。

# 处理消费速度慢

  • 提高消费并行度

    • 增加同一个 ConsumerGroup 下的 Consumer 实例数量(加机器或单机多进程)。
    • 增加单个 Consumer 的消费线程数(5.x SDK 通过 setConsumptionThreadCount 设置)。
  • 批量方式消费

    • 如果业务支持,批量处理消息能极大提高吞吐量(例如处理 10 个订单的耗时远小于串行处理 10 次)。建议使用 5.x SDK 的 SimpleConsumer 并设置批次大小。
  • 跳过非重要消息

    • 当消息堆积严重且业务允许时,可使用重置消费位点的功能,直接跳过(丢弃)指定时刻之前的不重要消息。
  • 优化单条消息消费过程

    • 重点减少 IO 操作(如数据库请求)次数。合并数据库查询或更新操作能显著降低消费耗时。
    • 对延迟敏感的应用,可将数据库部署在 SSD 硬盘来降低 IO 响应时间。

# 消费打印日志

  • 消息量少时,建议打印消息内容及消费耗时,方便排查问题。
  • 高 TPS 环境下,需谨慎开启,避免日志输出影响性能。
Updated at: 2025-10-04 10:44:20